Solutions/WithSecureElementsViaFunction/Data Connectors/WithSecureElementsAzureFunction/function_app.py (80 lines of code) (raw):
import logging
import os
import azure.functions as func
import requests
from azure.data.tables import TableServiceClient
from azure.identity import ClientSecretCredential
from azure.monitor.ingestion import LogsIngestionClient
from lib.azure_storage_table import StorageTableClient
from lib.log_ingestion_api import IngestionApiClient
from lib.withsecure_client import WithSecureClient
from lib.ws_connector import Connector
app = func.FunctionApp()
API_URL = "ELEMENTS_API_URL"
ENGINE = "ENGINE"
ENGINE_GROUP = "ENGINE_GROUP"
CLIENT_ID_KEY = "ELEMENTS_API_CLIENT_ID"
CLIENT_SECRET_KEY = "ELEMENTS_API_CLIENT_SECRET"
ENTRA_ID_KEY = "ENTRA_CLIENT_ID"
ENTRA_SECRET_KEY = "ENTRA_CLIENT_SECRET"
ENTRA_TENANT_KEY = "ENTRA_TENANT_ID"
CONNECTION_STRING = "STATE_TABLE_CS"
TABLE_NAME = "STATE_TABLE"
DATA_COLLECTION_ENDPOINT = "LOGS_ENDPOINT"
DCR_RULE_ID = "LOGS_DCR_RULE_ID"
DCR_STREAM = "LOGS_DCR_STREAM_NAME"
MAX_EVENTS_COUNT = "MAX_EVENTS_COUNT"
log = logging.getLogger(__name__)
@app.schedule(
schedule="0 * * * * *", arg_name="timer", run_on_startup=True, use_monitor=True
)
def upload_security_events(timer):
if timer.past_due:
log.info("The timer is past due!")
try:
elements_api_url = os.environ.get(API_URL)
state_conn_str = os.environ.get(CONNECTION_STRING)
state_table = os.environ.get(TABLE_NAME)
logs_endpoint = os.environ.get(DATA_COLLECTION_ENDPOINT)
dcr_rule_id = os.environ.get(DCR_RULE_ID)
dcr_stream = os.environ.get(DCR_STREAM)
client_id = os.environ.get(CLIENT_ID_KEY)
client_secret = os.environ.get(CLIENT_SECRET_KEY)
entra_client = os.environ.get(ENTRA_ID_KEY)
entra_secret = os.environ.get(ENTRA_SECRET_KEY)
entra_tenant = os.environ.get(ENTRA_TENANT_KEY)
max_events_count = os.environ.get(MAX_EVENTS_COUNT, 1000)
log.info("Secret values ok " + client_id)
storage_client = storage_table_client(state_conn_str, state_table)
ingestion_client = ingestion_api_client(
logs_endpoint,
dcr_rule_id,
dcr_stream,
entra_tenant,
entra_client,
entra_secret,
)
engine = os.environ.get(ENGINE, "default")
engine_group = os.environ.get(ENGINE_GROUP, "default")
withsecure_client = WithSecureClient(
elements_api_url, client_id, client_secret, engine, engine_group, requests, max_events_count
)
connector = Connector(
storage_client=storage_client,
ingestion_client=ingestion_client,
withsecure_client=withsecure_client,
)
connector.execute()
except Exception:
log.exception("Execution error")
def storage_table_client(connection_str, table_name):
table_service_client = TableServiceClient.from_connection_string(connection_str)
return StorageTableClient(table_service_client.get_table_client(table_name))
def ingestion_api_client(
api_url, dcr_rule_id, dcr_stream, entra_tenant, entra_client, entra_secret
):
creds = ClientSecretCredential(
tenant_id=entra_tenant, client_id=entra_client, client_secret=entra_secret
)
azure_client = LogsIngestionClient(api_url, credential=creds)
return IngestionApiClient(azure_client, dcr_rule_id, dcr_stream)